Flink 指南 您所在的位置:网站首页 flink hudi读取原理 Flink 指南

Flink 指南

2023-06-01 15:33| 来源: 网络整理| 查看: 265

Flink 指南

本指南提供了使用 Flink SQL 操作 Hudi 的文档。阅读本指南,您可以学习如何快速开始使用 Flink 读写 Hudi,同时对配置和任务优化有更深入的了解:

快速开始 :通过阅读 快速开始,你可以快速开始使用 Flink sql client 去读写 Hudi配置 :对于 Flink 配置,使用 $FLINK_HOME/conf/flink-conf.yaml 来配置。 对于任意一个作业的配置,通过表参数来设置写功能 :Flink 支持多种写功能用例,例如 离线批量导入,全量接增量,Changelog 模式,Insert 模式 和 离线 Compaction查询功能 :Flink 支持多种查询功能用例,例如 Hive 查询, Presto 查询优化 :针对 Flink 读写 Hudi 的操作,本指南提供了一些优化建议,例如 内存优化 和 写入限流快速开始​安装​

我们推荐使用 Flink Sql Client 来读写 Hudi,因为 Flink sql client 对于 SQL 用户来说更容易上手。

步骤1 下载 Flink jar​

我们推荐使用 Flink-1.12.x 来读写 Hudi。 你可以按照 Flink 安装文档 的指导来安装 Flink。 hudi-flink-bundle.jar 使用的是 scala 2.11,所以我们推荐 Flink-1.12.x 配合 scala 2.11 来使用。

步骤2 启动 Flink 集群​

在 Hadoop 环境下启动 standalone 的 Flink 集群。 在你启动 Flink 集群前,我们推荐先配置如下参数:

在 $FLINK_HOME/conf/flink-conf.yaml 中添加配置:taskmanager.numberOfTaskSlots: 4在 $FLINK_HOME/conf/flink-conf.yaml 中,根据数据量大小和集群大小来添加其他的 Flink 配置在 $FLINK_HOME/conf/workers 中添加4核 localhost 来保证我们本地集群中有4个 workers

启动集群:

# HADOOP_HOME 是 Hadoop 的根目录。export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`# 启动 Flink standalone 集群./bin/start-cluster.sh步骤3 启动 Flink SQL client​

Hudi 将 Flink 板块单独打包为 hudi-flink-bundle.jar,该 Jar 包需要在启动的时候加载。 你可以在 hudi-source-dir/packaging/hudi-flink-bundle 下手动的打包这个 Jar 包,或者从 Apache Official Repository 中下载。

启动 Flink SQL Client:

# HADOOP_HOME 是 Hadoop 的根目录。export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell小提示:为了兼容大部分的对象存储,我们推荐使用 Hadoop 2.9 x+的 Hadoop 版本flink-parquet 和 flink-avro 格式已经打包在 hudi-flink-bundle.jar 中了

根据下面的不同功能来设置表名,存储路径和操作类型。 Flink SQL Client 是逐行执行 SQL 的。

插入数据​

先创建一个 Flink Hudi 表,然后在通过下面的 VALUES 语句往该表中插入数据。

-- 为了更直观的显示结果,推荐把 CLI 的输出结果模式设置为 tableau。set execution.result-mode=tableau;CREATE TABLE t1( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ( 'connector' = 'hudi', 'path' = 'schema://base-path', 'table.type' = 'MERGE_ON_READ' -- 创建一个 MERGE_ON_READ类型的表,默认是 COPY_ON_ERITE);-- 使用 values 语句插入数据INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');查询数据​-- 从 Hudi 表中查询select * from t1;

该查询语句提供的是 快照读(Snapshot Querying)。 如果想了解更多关于表类型和查询类型的介绍,可以参考文档 表类型和查询类型

更新数据​

数据的更新和插入数据类似:

-- 这条语句会更新 key 为 'id1' 的记录insert into t1 values ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

需要注意的是:现在使用的存储类型为 Append。通常我们都是使用 apennd 模式,除非你是第一次创建这个表。 再次 查询数据 就会显示更新后的结果。 每一次的插入操作都会在时间轴上生成一个带时间戳的新的 commit,在元数据字段 _hoodie_commit_time 和同一 _hoodie_record_key 的 age字段中查看更新。

流式查询​

Hudi Flink 也有能力来查询从指定时间戳开始的流式记录集合。 该功能可以使用Hudi的流式查询,只需要提供一个查询开始的时间戳就可以完成查询。如果我们需要的 是指定时间戳后的所有数据,我们就不需要指定结束时间。

CREATE TABLE t1( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ( 'connector' = 'hudi', 'path' = 'oss://vvr-daily/hudi/t1', 'table.type' = 'MERGE_ON_READ', 'read.streaming.enabled' = 'true', -- 该参数开启流式查询 'read.streaming.start-commit' = '20210316134557' -- 指定开始的时间戳 'read.streaming.check-interval' = '4' -- 指定检查新的commit的周期,默认是60秒);-- 开启流式的查询select * from t1;

上述的查询会查询出 read.streaming.start-commit 时间戳后的所有数据。该功能的特殊在于可以同时在流和批的 pipeline 上执行。

删除数据​

在 流式查询 中使用数据时,Hudi Flink 源还可以接受来自底层数据源的更改日志,然后可以按行级别应用更新和删除。所以,你可以在 Hudi 上同步各种 RDBMS 的近实时快照。

Flink 配置​

在使用 Flink 前,你需要在 $FLINK_HOME/conf/flink-conf.yaml 中设置一些全局的 Flink 配置。

并行度​名称默认值类型描述taskmanager.numberOfTaskSlots1Integer单个 TaskManager 可以运行的并行 task 数。我们建议将该值设置为 > 4,实际值需根据数据量进行设置parallelism.default1Integer当用户为指定算子并行度时,会使用这个并行度(默认值是1)。例如 write.bucket_assign.tasks 没有设置,就会使用这个默认值内存​名称默认值类型描述jobmanager.memory.process.size(none)MemorySizeJobManager 的总进程内存大小。代表 JobManager JVM 进程消耗的所有内存,包括总的 Flink 内存、JVM 元空间和 JVM 开销taskmanager.memory.task.heap.size(none)MemorySizeTaskExecutor 的堆内存大小。这是为写缓存保留的 JVM 堆内存大小taskmanager.memory.managed.size(none)MemorySize这是内存管理器管理的堆外内存的大小,用于排序和 RocksDB 状态后端。如果选择 RocksDB 作为状态后端,则需要设置此内存Checkpoint​名称默认值类型描述execution.checkpointing.interval(none)Duration设置该值的方式为 execution.checkpointing.interval = 150000ms,其中150000ms = 2.5min。 设置这个参数等同于开启了 Checkpointstate.backend(none)String保存状态信息的状态后端。 我们推荐设置状态后端为 rocksdb :state.backend: rocksdbstate.backend.rocksdb.localdir(none)StringRocksDB 存储状态信息的路径state.checkpoints.dir(none)StringCheckpoint 的默认路径,用于在支持 Flink 的文件系统中存储检查点的数据文件和元数据。存储路径必须可从所有参与进程/节点(即所有 TaskManager 和 JobManager)访问,如 HDFS 和 OSS 路径state.backend.incrementalfalseBoolean选项状态后端是否创建增量检查点。对于增量检查点,只存储与前一个检查点的差异,而不是完整的检查点状态。如果存储状态设置为 rocksdb,建议打开这个选项表参数​

对于单个作业的配置,我们可以在 Flink SQL 语句的 WITH中设置。 所以,作业级别的配置如下:

内存​note

我们在内存调优的时候需要先关注 TaskManager 的数量和 内存 配置,以及 write task 的并发(write.tasks: 4 的值),确认每个 write task 能够分配到足够的内存, 再考虑以下相关的内存参数设置。

名称说明默认值备注write.task.max.size一个 write task 的最大可用内存。 默认为 1024MB1024D当前预留给 write buffer 的内存为 write.task.max.size - compaction.max_memory。 当 write task 的内存 buffer 达到该阈值后会将内存里最大的 buffer flushwrite.batch.sizeFlink 的 write task 为了提高写数据效率,会按照写 bucket 提前缓存数据,每个 bucket 的数据在内存达到阈值之前会一直缓存在内存中,当阈值达到会把数据 buffer 传递给 Hudi 的 writer 执行写操作。默认为 64MB64D推荐使用默认值write.log_block.sizeHudi 的 log writer 在收到 write task 的数据后不会马上 flush 数据,writer 是以 LogBlock 为单位往磁盘刷数据的,在 LogBlock 攒够之前 records 会以序列化字节的形式缓存在 writer 内部。默认为 128MB128推荐使用默认值write.merge.max_memory当表的类型为 COPY_ON_WRITE,Hudi 会合并增量数据和 base file 中的数据。增量的数据会缓存在内存的 map 结构里,这个 map 是可溢写的,这个参数控制了 map 可以使用的堆内存大小。 默认为 100MB100推荐使用默认值compaction.max_memory同 write.merge.max_memory 类似,只是发生在压缩时。默认为 100MB100如果是在线 compaction,资源充足时可以开大些,比如 1024MB并行度​名称说明默认值备注write.taskswriter 的并发,每个 writer 顺序写 1~N 个 buckets。默认为 44增加这个并发,对小文件个数没影响write.bucket_assign.tasksbucket assigner 的并发,默认使用 Flink 默认并发 parallelism.defaultparallelism.default增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件(小 bucket)数write.index_boostrap.tasksIndex bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 Checkpoint,因此需要设置多一些的 Checkpoint 失败容忍次数。 默认使用 Flink 默认并发 parallelism.defaultparallelism.default只在 index.bootsrap.enabled 为 true 时生效read.tasks读算子的并发(batch 和 stream。默认为 44compaction.tasks在线 compaction 的并发。默认为 1010在线 compaction 比较耗费资源,建议使用 离线 compactionCompaction​note

这些参数都只服务于在线 compaction。

note

通过设置compaction.async.enabled = false 来关闭在线 compaction,但是 compaction.schedule.enable 仍然建议开启。之后通过离线 compaction 直接执行在线 compaction 产生的 compaction plan。

名称说明默认值备注compaction.schedule.enabled是否阶段性生成 compaction plantrue建议开启,即使 compaction.async.enabled = falsecompaction.async.enabled是否开启异步压缩,MOR 表时默认开启true通过关闭此参数来关闭 在线 compactioncompaction.trigger.strategy压缩策略num_commits可选择的策略有 num_commits:达到 N 个 delta commits 时触发 compaction; time_elapsed:距离上次 compaction 超过 N 秒触发 compaction ; num_and_time:NUM_COMMITS 和 TIME_ELAPSED 同时满足; num_or_time:NUM_COMMITS 或者 TIME_ELAPSED 中一个满足compaction.delta_commits默认策略,5 个 delta commits 触发一次压缩5--compaction.delta_seconds默认 1 小时触发一次压缩3600--compaction.max_memorycompaction 的 hashMap 可用内存。 默认值为 100MB100资源够用的话,建议调整到 1024MBcompaction.target_io每个 compaction plan 的 IO 内存上限 (读和写)。 默认值为 5GB5120离线 compaction 的默认值是 500GB内存优化​MOR​把 Flink 的状态后端设置为 rocksdb (默认的 in memory 状态后端非常的消耗内存)如果内存足够,compaction.max_memory 可以设置得更大些(默认为 100MB,可以调大到 1024MB)关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的内存大小。 比如 taskManager 的内存是 4GB, 运行了 2 个 StreamWriteFunction,那每个 write function 能分到 2GB,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如 BucketAssignFunction)也会消耗一些内存需要关注 compaction 的内存变化。 compaction.max_memory 控制了每个 compaction task 读 log 时可以利用的内存大小。compaction.tasks 控制了 compaction task 的并发COW​把 Flink 的状态后端设置为 rocksdb (默认的 in memory 状态后端非常的消耗内存)同时调大 write.task.max.size 和 write.merge.max_memory (默认值分别是 1024MB 和 100MB,可以调整为 2014MB 和 1024MB)关注 taskManager 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的内存大小。 比如 taskManager 的内存是 4GB, 运行了 2 个 StreamWriteFunction,那每个 write function 能分到 2GB,尽量预留一些缓存。因为网络缓存,taskManager 上其他类型的 task (比如 BucketAssignFunction)也会消耗一些内存离线批量导入​

针对存量数据导入的需求,如果存量数据来源于其他数据源,可以使用离线批量导入功能(bulk_insert),快速将存量数据导入 Hudi。

note

bulk_insert 省去了 avro 的序列化以及数据的 merge 过程,后续也不会再有去重操作。所以,数据的唯一性需要自己来保证。

note

bulk_insert 在 batch execution mode 模式下执行更加高效。 batch execution mode 模式默认会按照 partition path 排序输入消息再写入 Hudi, 避免 file handle 频繁切换导致性能下降。

note

bulk_insert 的 write tasks 的并发是通过参数 write.tasks 来指定,并发的数量会影响到小文件的数量,理论上,bulk_insert 的 write tasks 的并发数就是划分的 bucket 数, 当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会回滚到新的文件句柄,所以最后:写文件数量 >= write.bucket_assign.tasks。

参数​名称Required默认值备注write.operationtrueupsert开启 bulk_insert 功能write.tasksfalse4bulk_insert write tasks 的并发,最后的文件数 >= write.bucket_assign.taskswrite.bulk_insert.shuffle_by_partitionfalsetrue是否将数据按照 partition 字段 shuffle 后,再通过 write task 写入,开启该参数将减少小文件的数量,但是有数据倾斜的风险write.bulk_insert.sort_by_partitionfalsetrue是否将数据线按照 partition 字段排序后,再通过 write task 写入,当一个 write task 写多个 partition时,开启可以减少小文件数量write.sormoryfalse128sort 算子的可用 managed memory(单位 MB)。默认为 128 MB全量接增量​

针对全量数据导入后,接增量的需求。如果已经有全量的离线 Hudi 表,需要接上实时写入,并且保证数据不重复,可以开启 全量接增量(index bootstrap)功能。

note

如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启 写入限流 )。

参数​名称Required默认值备注index.bootstrap.enabledtruefalse开启 index bootstrap 索引加载后,会将已存在的 Hudi 表的数据一次性加载到 state 中index.partition.regexfalse*设置正则表达式进行分区筛选,默认为加载全部分区使用流程​CREATE TABLE 创建和 Hudi 表对应的语句,注意 table.type 必须正确设置 index.bootstrap.enabled = true 开启索引加载功能在 flink-conf.yaml 中设置 Checkpoint 失败容忍 :execution.checkpointing.tolerable-failed-checkpoints = n(取决于checkpoint 调度次数)等待第一次 Checkpoint 完成,表示索引加载完成索引加载完成后可以退出并保存 savepoint(也可以直接用 externalized checkpoint)重启任务,将 index.bootstrap.enable 设置为 false,参数配置到合适的大小note索引加载是阻塞式,所以在索引加载过程中 Checkpoint 无法完成索引加载由数据流触发,需要确保每个 partition 都至少有1条数据,即上游 source 有数据进来索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索 finish loading the index under partition 和 Load record form file 日志内容来观察索引加载的进第一次 Checkpoint 成功就表示索引已经加载完成,后续从 Checkpoint 恢复时无需再次加载索引Changelog 模式​

针对使用 Hudi 保留消息的所有变更(I / -U / U / D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算)的需求,Hudi 的 MOR 表 通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。

参数​名称Required默认值备注changelog.enabledfalsefalse默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更note

批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。

note

设置 changelog.enable 为 true 后,中间的变更也只是 best effort:异步的压缩任务会将中间变更合并成 1 条,所以如果流读消费不够及时,被压缩后 只能读到最后一条记录。当然,通过调整压缩的缓存时间可以预留一定的时间缓冲给 reader,比如调整压缩的两个参数:compaction.delta_commits and compaction.delta_seconds。

Insert 模式​

当前 Hudi 对于 Insert 模式 默认会采用小文件策略:MOR 会追加写 avro log 文件,COW 会不断合并之前的 parquet 文件(并且增量的数据会去重),这样会导致性能下降。

如果想关闭文件合并,可以设置 write.insert.deduplicate 为 false。 关闭后,不会有任何的去重行为,每次 flush 都是直接写独立的 parquet(MOR 表也会直接写 parquet)。

参数​名称Required默认值备注write.insert.deduplicatefalsetrue默认 Insert 模式 会去重,关闭后每次 flush 都会直接写独立的 parquetHive 查询​打包​

第一步是打包 hudi-flink-bundle_2.11-0.9.0.jar。 hudi-flink-bundle module pom.xml 默认将 Hive 相关的依赖 scope 设置为 provided, 如果想打入 Hive 的依赖,需要显示指定 Profile 为 flink-bundle-shade-hive。执行以下命令打入 Hive 依赖:

# Maven 打包命令mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2# 如果是 hive3 需要使用 profile -Pflink-bundle-shade-hive3# 如果是 hive1 需要使用 profile -Pflink-bundle-shade-hive1 note

Hive1.x 现在只能实现同步 metadata 到 Hive,而无法使用 Hive 查询,如需查询可使用 Spark 查询 Hive 外表的方法查询。

note

使用 -Pflink-bundle-shade-hive x,需要修改 Profile 中 Hive 的版本为集群对应版本(只需修改 Profile 里的 Hive 版本)。修改位置为 packaging/hudi-flink-bundle/pom.xml 最下面的对应 Profile 段,找到后修改 Profile 中的 Hive 版本为对应版本即可。

Hive 环境准备​

第一步是将 hudi-hadoop-mr-bundle.jar 放到 Hive中。 在 Hive 的根目录下创建 auxlib/ 文件夹,把 hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar 移入到 auxlib。 hudi-hadoop-mr-bundle-0.x.x-SNAPSHOT.jar 可以在 packaging/hudi-hadoop-mr-bundle/target 目录下拷贝。

第二步是开启 Hive 相关的服务。Flink SQL Client 远程连接 Hive 的时候,要求 Hive 的 hive metastore and hiveserver2 两个服务都开启且需要记住端口号。 服务开启的方法如下:

# 启动 Hive metastore 和 hiveserver2nohup ./bin/hive --service metastore &nohup ./bin/hive --service hiveserver2 &# 每次更新了 /auxlib 下的 jar 包都需要重启上述两个服务Hive 配置模版​

Flink hive sync 现在支持两种 hive sync mode,分别是 hms 和 jdbc。其中 hms 模式只需要配置 Metastore uris;而 jdbc 模式需要同时 配置 JDBC 属性 和 Metastore uris,具体配置模版如下:

-- hms mode 配置模版CREATE TABLE t1( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ( 'connector' = 'hudi', 'path' = 'oss://vvr-daily/hudi/t1', 'table.type' = 'COPY_ON_WRITE', --MERGE_ON_READ 方式在没生成 parquet 文件前,Hive 不会有输出 'hive_sync.enable' = 'true', -- Required。开启 Hive 同步功能 'hive_sync.mode' = 'hms' -- Required。将 hive sync mode 设置为 hms, 默认 jdbc 'hive_sync.metastore.uris' = 'thrift://ip:9083' -- Required。metastore 的端口);-- jdbc mode 配置模版CREATE TABLE t1( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20))PARTITIONED BY (`partition`)WITH ( 'connector' = 'hudi', 'path' = 'oss://vvr-daily/hudi/t1', 'table.type' = 'COPY_ON_WRITE', --MERGE_ON_READ 方式在没生成 parquet 文件前,Hive 不会有输出 'hive_sync.enable' = 'true', -- Required。开启 Hive 同步功能 'hive_sync.mode' = 'hms' -- Required。将 hive sync mode 设置为 hms, 默认 jdbc 'hive_sync.metastore.uris' = 'thrift://ip:9083' -- Required。metastore 的端口 'hive_sync.jdbc_url'='jdbc:hive2://ip:10000', -- required。hiveServer 的端口 'hive_sync.table'='t1', -- required。hive 新建的表名 'hive_sync.db'='testDB', -- required。hive 新建的数据库名 'hive_sync.username'='root', -- required。HMS 用户名 'hive_sync.password'='your password' -- required。HMS 密码);Hive 查询​

使用 beeline 查询时,需要手动设置:

set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;与其他包冲突​

当 Flink lib/ 下有 flink-sql-connector-hive-xxx.jar 时,会出现hive包冲突,解决方法是在 Install 时,另外再指定一个 Profile:-Pinclude-flink-sql-connector-hive, 同时删除 Flink lib/ 下的 flink-sql-connector-hive-xxx.jar,新的 Install 的命令如下:

# Maven 打包命令mvn install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2 -Pinclude-flink-sql-connector-hivePresto 查询​Hive 同步​

首先,参考 Hive 查询 过程,完成 Hive 元数据的同步。

Presto 环境配置​根据 Presto 配置文档 来配置 Presto。根据下面的配置,在 /presto-server-0.2xxx/etc/catalog/hive.properties 中配置 Hive catalog:connector.name=hive-hadoop2hive.metastore.uri=thrift://xxx.xxx.xxx.xxx:9083hive.config.resources=.../hadoop-2.x/etc/hadoop/core-site.xml,.../hadoop-2.x/etc/hadoop/hdfs-site.xmlPresto 查询​

通过 presto-cli 连接 Hive metastore 开启查询。 presto-cli 的设置参考 presto-cli 配置:

# 连接 presto server 的命令./presto --server xxx.xxx.xxx.xxx:9999 --catalog hive --schema defaultnotePresto-server-0.2445 版本较低,在查 MOR 表的 rt 表时,会出现包冲突,正在解决中当 Presto-server-xxx 的版本 < 0.233 时,hudi-presto-bundle.jar 需要手动导入到 {presto_install_dir}/plugin/hive-hadoop2/ 中。离线 Compaction​

MERGE_ON_READ 表的 compaction 默认是打开的,策略是 5 个 delta commits 执行一次压缩。因为压缩操作比较耗费内存,和写流程放在同一个 pipeline,在数据量比较大 的时候(10w+/s),容易干扰写流程,此时采用离线定时任务的方式执行 compaction 任务更稳定。

note

一个 compaction 任务的执行包括两部分:1. 生成 compaction plan; 2. 执行对应的 compaction plan。其中,第一步生成 compaction plan 的过程推荐由写任务定时触发, 写参数 compaction.schedule.enable 默认开启。

离线 compaction 需要手动执行 Flink 任务,程序入口为: hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar : org.apache.hudi.sink.compact.HoodieFlinkCompactor

# 命令行执行方式./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink-bundle_2.11-0.9.0.jar --path hdfs://xxx:9000/table参数​名称Required默认值备注--pathfrue--目标 Hudi 表的路径--compaction-max-memoryfalse100压缩时 log 数据的索引 HashMap 的内存大小,默认 100MB,内存足够时,可以调大--schedulefalsefalse是否要执行 schedule compaction 的操作,当写流程还在持续写入表数据的时候,开启这个参数有丢失查询数据的风险,所以开启该参数一定要保证当前没有任务往表里写数据,写任务的 compaction plan 默认是一直 schedule 的,除非手动关闭(默认 5 个 delta commits 一次 compaction)--seqfalseLIFO执行压缩任务的顺序,默认是从最新的 compaction plan 开始执行,可选值:LIFO :从最新的 plan 开始执行; FIFO:从最老的 plan 开始执行写入限流​

针对将全量数据(百亿数量级)和增量先同步到 Kafka,再通过 Flink 流式消费的方式将库表数据直接导成 Hudi 表的需求,因为直接消费全量数据:量大 (吞吐高)、乱序严重(写入的 partition 随机),会导致写入性能退化,出现吞吐毛刺等情况,这时候可以开启限速参数,保证流量平稳写入。

参数​名称Required默认值备注write.rate.limitfalse0默认关闭限速从这开始下一步?​

您也可以通过自己构建hudi来快速开始, 并在spark-shell命令中使用--jars /packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.1?-*.*.*-SNAPSHOT.jar, 而不是--packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0

这里我们使用Spark演示了Hudi的功能。但是,Hudi可以支持多种存储类型/视图,并且可以从Hive,Spark,Presto等查询引擎中查询Hudi数据集。 我们制作了一个基于Docker设置、所有依赖系统都在本地运行的演示视频, 我们建议您复制相同的设置然后按照这里的步骤自己运行这个演示。 另外,如果您正在寻找将现有数据迁移到Hudi的方法,请参考迁移指南。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有